package com.uxsino.reactorq.commons;

import java.net.URL;
import java.util.Objects;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.BlobMessage;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * 
 * a reactor Subscriber<Message> using JMS message queue
 *
 */
public abstract class JMSSubscriber<T> implements Subscriber<T> {
    private static Logger logger = LoggerFactory.getLogger(JMSSubscriber.class);

    protected static final int TIME_TO_LIVE = 60 * 60 * 1000;

    protected Connection connection = null;

    protected Class<T> msgClass;

    protected Session session = null;

    protected Destination destination = null;

    protected String destinationName;

    protected MessageProducer producer;

    // private final AtomicBoolean subscribed = new AtomicBoolean(false);

    private Subscription subscription;

    private ObjectMapper mapper = new ObjectMapper();

    protected JMSSubscriber(Connection conn, String destName, Class<T> cls) throws JMSException {
        Objects.requireNonNull(conn);
        msgClass = cls;
        connection = conn;
        destinationName = destName;
        destination = null;

        openSession();
    }

    protected JMSSubscriber(Connection conn, Destination dest, Class<T> cls) throws JMSException {
        Objects.requireNonNull(conn);
        Objects.requireNonNull(dest);
        msgClass = cls;
        connection = conn;
        destination = dest;

        if (dest instanceof javax.jms.Queue) {
            destinationName = ((javax.jms.Queue) dest).getQueueName();
        } else if (dest instanceof javax.jms.Topic) {
            destinationName = ((javax.jms.Topic) dest).getTopicName();
        } else
            destinationName = null;

        openSession();
    }

    private void setMessageText(String s, javax.jms.Message msg) throws JMSException {
        if (msg instanceof javax.jms.TextMessage) {
            ((javax.jms.TextMessage) msg).setText(s);
        } else {
            msg.setStringProperty("text", s);
        }

    }

    private void setMessageText(T t, javax.jms.Message msg) throws JMSException, JsonProcessingException {
        setMessageText(msgClass.equals(String.class) ? (String) t : mapper.writeValueAsString(t), msg);
    }

    private void sendMessage(T t, String receiverId, Destination replyTo) {
        if (session == null) {
            try {
                openSession();
            } catch (JMSException e1) {
                logger.error("send message.{}", e1);
            }
        }
        try {
            javax.jms.TextMessage msg = session.createTextMessage();
            setMessageText(t, msg);
            if (receiverId != null) {
                msg.setStringProperty("receiverId", receiverId);
            }
            if (null != replyTo) {
                msg.setJMSReplyTo(replyTo);
            }

            // not sure if activemq producer support concurrent send
            // synchronize it for safe
            // TODO check activemq concurrent send
            synchronized (this) {
                producer.send(msg);
            }
            // session.commit();
        } catch (JMSException e) {
            logger.error("error createTextMessage ", e);
        } catch (JsonProcessingException e) {
            logger.error("error encoding json ", e);
        }
        if (subscription != null) {
            subscription.request(1);
        }

    }

    private void sendBlobMessage(T t, Destination replyTo, URL urlBlob) {
        if (session == null) {
            try {
                openSession();
            } catch (JMSException e1) {
                logger.error("send message.{}", e1);
            }
        }
        try {

            BlobMessage msg = ((org.apache.activemq.ActiveMQSession) session).createBlobMessage(urlBlob);

            setMessageText(t, msg);
            if (null != replyTo) {
                msg.setJMSReplyTo(replyTo);
            }

            // not sure if activemq producer support concurrent send
            // synchronize it for safe
            // TODO check activemq concurrent send
            synchronized (this) {
                producer.send(msg);
            }
            // session.commit();
        } catch (JMSException e) {
            logger.error("error createTextMessage ", e);
        } catch (JsonProcessingException e) {
            logger.error("error encoding json ", e);
        }
        if (subscription != null) {
            subscription.request(1);
        }

    }

    protected abstract Session openSession(Connection conn) throws JMSException;

    protected abstract Destination openDestination(Session session) throws JMSException;

    protected abstract MessageProducer openProducter(Session session) throws JMSException;

    private synchronized void closeSession() {
        try {
            if (producer != null) {
                producer.close();
                producer = null;
            }
            if (session != null) {
                session.close();
                session = null;
            }
        } catch (JMSException e) {
            logger.debug("error closing session ", e);
        }
    }

    /**
     * opens session AND destination, message producer. JMS supports multiple
     * destination in one session in case of reactorq, use one session per
     * destination
     * 
     * @throws JMSException
     */
    private void openSession() throws JMSException {
        synchronized (this) {
            session = openSession(connection);
            destination = openDestination(session);
            producer = openProducter(session);
        }
    }

    @Override
    public void onSubscribe(Subscription s) {
        subscription = s;
        s.request(Long.MAX_VALUE - 1);
    }

    /**
     * send the messgae
     */
    @Override
    public void onNext(T t) {
        if (t instanceof IMessageWithReceiver) {
            sendMessage(t, ((IMessageWithReceiver) t).getReceiverId(), null);
        } else {
            sendMessage(t, null, null);
        }
    }

    /**
     * send the message with jms reply to 
     * @param t
     * @param replyTo a queue/topic to reply to
     */
    public void onNext(T t, String receiverId, Destination replyTo) {
        sendMessage(t, receiverId, replyTo);
    }

    public void onNextWithAttachement(T t, URL attachement) {
        sendBlobMessage(t, null, attachement);
    }

    public void onNextWithAttachement(T t, Destination dest, URL attachement) {
        sendBlobMessage(t, dest, attachement);
    }

    @Override
    public void onError(Throwable t) {
        logger.error(t.getMessage(), t);
        closeSession();
    }

    @Override
    public void onComplete() {
        closeSession();
    }

    void dispose() {
        closeSession();

        connection = null;
    }

    void disposeSession() {
        closeSession();
    }

}
